Skip to content

fix: add a timeout to RedisSteamBroker xautoclaim lock to prevent infinite locking#107

Merged
spikeninja merged 1 commit intotaskiq-python:mainfrom
FlorianLB:fix-autoclaim-infinite-locking
Feb 3, 2026
Merged

fix: add a timeout to RedisSteamBroker xautoclaim lock to prevent infinite locking#107
spikeninja merged 1 commit intotaskiq-python:mainfrom
FlorianLB:fix-autoclaim-infinite-locking

Conversation

@FlorianLB
Copy link
Contributor

@FlorianLB FlorianLB commented Nov 10, 2025

If for any reasons the worker crashes or get killed when holding the lock, the lock will never be released. It happened to us when a message make the container running out-of-memory, killing the worker right away and never releasing the lock.

This PR adds a timeout to at least release the lock at some point.

@FlorianLB FlorianLB force-pushed the fix-autoclaim-infinite-locking branch from f03a4a3 to 3b690f2 Compare November 10, 2025 18:38
@FlorianLB
Copy link
Contributor Author

@s3rius is something missing to merge this one ?

We have prod issues are happening again and again because of that.

@FlorianLB FlorianLB force-pushed the fix-autoclaim-infinite-locking branch 2 times, most recently from e708799 to 4cd0d63 Compare January 13, 2026 21:02
@FlorianLB
Copy link
Contributor Author

anyone ? @danfimov @spikeninja ?

@spikeninja
Copy link
Contributor

spikeninja commented Feb 3, 2026

Add a test pls to test_broker.py
you can use this one or come up with your own

@pytest.mark.anyio
async def test_unacknowledged_lock_timeout_in_stream_broker(
    redis_url: str,
    valid_broker_message: BrokerMessage,
) -> None:
    unacknowledged_lock_timeout = 1
    queue_name = uuid.uuid4().hex
    consumer_group_name = uuid.uuid4().hex

    broker = RedisStreamBroker(
        url=redis_url,
        approximate=False,
        queue_name=queue_name,
        consumer_group_name=consumer_group_name,
        unacknowledged_lock_timeout=unacknowledged_lock_timeout,
    )

    await broker.startup()
    await broker.kick(valid_broker_message)

    message = await get_message(broker)
    assert isinstance(message, AckableMessage)
    assert message.data == valid_broker_message.message

    async with Redis(connection_pool=broker.connection_pool) as redis:
        lock_key = f"autoclaim:{consumer_group_name}:{queue_name}"
        await redis.exists(lock_key)
        await asyncio.sleep(unacknowledged_lock_timeout + 0.5)

        lock_exists_after_timeout = await redis.exists(lock_key)
        assert lock_exists_after_timeout == 0, "Lock should be released after timeout"

    await broker.shutdown()

@FlorianLB FlorianLB force-pushed the fix-autoclaim-infinite-locking branch from 4cd0d63 to 500ed50 Compare February 3, 2026 14:24
@FlorianLB
Copy link
Contributor Author

Add a test pls to test_broker.py you can use this one or come up with your own

Unfortunately your test doesn't cover the issue. The bug reproduction is quite complex : concurrent broker, crash, etc

@spikeninja
Copy link
Contributor

spikeninja commented Feb 3, 2026

Add a test pls to test_broker.py you can use this one or come up with your own

Unfortunately your test doesn't cover the issue. The bug reproduction is quite complex : concurrent broker, crash, etc

it should not cover your bug, it should cover at least the new field existence and check that the lock is properly acquired and released

@spikeninja
Copy link
Contributor

spikeninja commented Feb 3, 2026

or come up with your own

feel free to add your tests as i've mentioned here

@FlorianLB FlorianLB force-pushed the fix-autoclaim-infinite-locking branch from 500ed50 to 04836a6 Compare February 3, 2026 14:49
@FlorianLB FlorianLB force-pushed the fix-autoclaim-infinite-locking branch from 04836a6 to e997423 Compare February 3, 2026 14:52
@FlorianLB
Copy link
Contributor Author

Nullable typing + basic test ✅

@spikeninja spikeninja merged commit 374c789 into taskiq-python:main Feb 3, 2026
7 checks passed
@FlorianLB
Copy link
Contributor Author

Thx a lot @spikeninja 🙏

Do you think you can release soon ?

@spikeninja
Copy link
Contributor

Thx a lot @spikeninja 🙏

Do you think you can release soon ?

https://github.com/taskiq-python/taskiq-redis/releases/tag/1.2.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants